---
title: "DataStream API"
sidebar_position: 6
---

# DataStream API
## Overview
The Fluss DataStream Connector for Apache Flink provides a Flink DataStream source implementation for reading data from Fluss tables and a Flink DataStream sink implementation for writing data to Fluss tables. It allows you to seamlessly integrate Fluss tables with Flink's DataStream API, enabling you to process data from Fluss in your Flink applications.

Key features of the Fluss DataStream Connector include:
* Reading from both primary key tables and log tables
* Support for projection pushdown to select specific fields
* Flexible offset initialization strategies
* Custom de/serialization schemas for converting between Fluss records and your data types
* Writing to both primary key tables and log tables
* Support for different operation types (`INSERT`, `UPDATE`, `DELETE`)
* Configurable sink behavior with custom options
* Automatic handling of upserts for primary key tables

## Dependency
In order to use the Fluss DataStream Connector in Flink DataStream API, you need to add the following dependency to your `pom.xml` file,
according to the Flink version you are using. The Fluss DataStream Connector is available for Flink versions 1.18, 1.19, and 1.20.


import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

<Tabs>
    <TabItem value="flink-1.20" label="Flink 1.20" default>
        ```xml
        <!-- https://mvnrepository.com/artifact/org.apache.fluss/fluss-flink-1.20 -->
        <dependency>
            <groupId>org.apache.fluss</groupId>
            <artifactId>fluss-flink-1.20</artifactId>
            <version>$FLUSS_VERSION$</version>
        </dependency>
        ```
    </TabItem>
    <TabItem value="flink-1.19" label="Flink 1.19">
        ```xml
        <!-- https://mvnrepository.com/artifact/org.apache.fluss/fluss-flink-1.19 -->
        <dependency>
            <groupId>org.apache.fluss</groupId>
            <artifactId>fluss-flink-1.19</artifactId>
            <version>$FLUSS_VERSION$</version>
        </dependency>
        ```
    </TabItem>
    <TabItem value="flink-1.18" label="Flink 1.18">
        ```xml
        <!-- https://mvnrepository.com/artifact/org.apache.fluss/fluss-flink-1.18 -->
        <dependency>
            <groupId>org.apache.fluss</groupId>
            <artifactId>fluss-flink-1.18</artifactId>
            <version>$FLUSS_VERSION$</version>
        </dependency>
        ```
    </TabItem>
</Tabs>



## DataStream Source
### Initialization
The main entry point for the Fluss DataStream API is the `FlussSource` class. You create a `FlussSource` instance using the builder pattern, which allows for step-by-step configuration of the source connector.

```java
// Create a FlussSource using the builder pattern
FlussSource<Order> flussSource = FlussSource.<Order>builder()
    .setBootstrapServers("localhost:9123")
    .setDatabase("mydb")
    .setTable("orders")
    .setProjectedFields("orderId", "amount")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setScanPartitionDiscoveryIntervalMs(1000L)
    .setDeserializationSchema(new OrderDeserializationSchema())
    .build();

DataStreamSource<Order> stream =
        env.fromSource(flussSource, WatermarkStrategy.noWatermarks(), "Fluss Orders Source");

stream.print();
```

### Configuration Options
The `FlussSourceBuilder` provides several methods for configuring the source connector:

#### Required Parameters
* **setBootstrapServers(String bootstrapServers):** Sets the bootstrap servers for the Fluss source connection
* **setDatabase(String database):** Sets the database name for the Fluss source
* **setTable(String table):** Sets the table name for the Fluss source
* **setDeserializationSchema(FlussDeserializationSchema&lt;T&gt; schema):** Sets the deserialization schema for converting Fluss records to output records

#### Optional Parameters
* **setProjectedFields(String... projectedFieldNames):** Sets the fields to project from the table (if not specified, all fields are included)
* **setScanPartitionDiscoveryIntervalMs(long intervalMs):** Sets the interval for discovering new partitions (default: from configuration)
* **setStartingOffsets(OffsetsInitializer initializer):** Sets the strategy for determining starting offsets (default: `OffsetsInitializer.full()`)
* **setFlussConfig(Configuration flussConf):** Sets custom Fluss configuration properties

### Offset Initializers
The `OffsetsInitializer` interface provides several factory methods for creating different types of initializers:

* **OffsetsInitializer.earliest():** Initializes offsets to the earliest available offsets of each bucket
* **OffsetsInitializer.latest():** Initializes offsets to the latest offsets of each bucket
* **OffsetsInitializer.full():** Performs a full snapshot on the table upon first startup:
  * For **log tables:** reads from the earliest log offset (equivalent to earliest())
  * For **primary key tables:** reads the latest snapshot which materializes all changes on the table
* **OffsetsInitializer.timestamp(long timestamp):** Initializes offsets based on a given timestamp

**Example:**
```java
// Start reading from the earliest available offsets
FlussSource<Order> source = FlussSource.<Order>builder()
    .setStartingOffsets(OffsetsInitializer.earliest())
    // other configuration...
    .build();

// Start reading from the latest offsets
FlussSource<Order> source = FlussSource.<Order>builder()
    .setStartingOffsets(OffsetsInitializer.latest())
    // other configuration...
    .build();

// Start reading from a specific timestamp
FlussSource<Order> source = FlussSource.<Order>builder()
    .setStartingOffsets(OffsetsInitializer.timestamp(System.currentTimeMillis() - 3600 * 1000))
    // other configuration...
    .build();
```

### Deserialization Schemas
The `FlussDeserializationSchema` interface is used to convert Fluss records to your desired output type. Fluss provides some built-in implementations:

* **RowDataDeserializationSchema** - Converts Fluss records to Flink's `RowData` objects
* **JsonStringDeserializationSchema** - Converts Fluss records to JSON strings

You can also implement your own deserialization schema by implementing the `FlussDeserializationSchema` interface:

```java
public class OrderDeserializationSchema implements FlussDeserializationSchema<Order> {
    @Override
    public void open(InitializationContext context) throws Exception {
        // Initialization code if needed
    }

    @Override
    public Order deserialize(LogRecord record) throws Exception {
        InternalRow row = record.getRow();

        // Extract fields from the row
        long orderId = row.getLong(0);
        long itemId = row.getLong(1);
        int amount = row.getInt(2);
        String address = row.getString(3).toString();

        // Create and return your custom object
        return new Order(orderId, itemId, amount, address);
    }

    @Override
    public TypeInformation<Order> getProducedType(RowType rowSchema) {
        return TypeInformation.of(Order.class);
    }
}
```

### Examples

#### Reading from a Primary Key Table
When reading from a primary key table, the Fluss DataStream Connector automatically handles updates to the data.
For each update, it emits both the before and after versions of the record with the appropriate `RowKind` (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`).

```java
// Create a FlussSource for a primary key table
FlussSource<RowData> flussSource = FlussSource.<RowData>builder()
    .setBootstrapServers("localhost:9123")
    .setDatabase("mydb")
    .setTable("orders_pk")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setDeserializationSchema(new RowDataDeserializationSchema())
    .build();

// Create a DataStream from the FlussSource
DataStreamSource<RowData> stream = env.fromSource(
    flussSource,
    WatermarkStrategy.noWatermarks(),
    "Fluss PK Source"
);

// Process the stream to handle different row kinds
// For INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE events
```

**Note:** If you are mapping from `RowData` to your POJO objects, you might want to include the row kind operation.

#### Reading from a Log Table
When reading from a log table, all records are emitted with `RowKind.INSERT` since log tables only support appends.

```java
// Create a FlussSource for a log table
FlussSource<RowData> flussSource = FlussSource.<RowData>builder()
    .setBootstrapServers("localhost:9123")
    .setDatabase("mydb")
    .setTable("orders_log")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setDeserializationSchema(new RowDataDeserializationSchema())
    .build();

// Create a DataStream from the FlussSource
DataStreamSource<RowData> stream = env.fromSource(
    flussSource,
    WatermarkStrategy.noWatermarks(),
    "Fluss Log Source"
);
```

#### Using Projection Pushdown
Projection pushdown allows you to select only the fields you need, which can improve performance by reducing the amount of data transferred.

```java
// Create a FlussSource with projection pushdown
FlussSource<OrderPartial> flussSource = FlussSource.<OrderPartial>builder()
    .setBootstrapServers("localhost:9123")
    .setDatabase("mydb")
    .setTable("orders")
    .setProjectedFields("orderId", "amount")  // Only select these fields
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setDeserializationSchema(new OrderPartialDeserializationSchema())
    .build();

// Create a DataStream from the FlussSource
DataStreamSource<OrderPartial> stream = env.fromSource(
    flussSource,
    WatermarkStrategy.noWatermarks(),
    "Fluss Source with Projection"
);
```

In this example, `OrderPartial` is a class that only contains the `orderId` and `amount` fields, and `OrderPartialDeserializationSchema` is a deserialization schema that knows how to convert the projected fields to `OrderPartial` objects.

## DataStream Sink

### Initialization
The main entry point for the Fluss DataStream Sink API is the `FlussSink` class. You create a `FlussSink` instance using the `FlussSinkBuilder`, which allows for step-by-step configuration of the sink connector.

```java
FlussSink<RowData> flussSink =
        FlussSink.<RowData>builder()
                .setBootstrapServers("localhost:9123")
                .setDatabase("mydb")
                .setTable("orders")
                .setSerializationSchema(new RowDataSerializationSchema(false, true))
                .build();

stream.sinkTo(flussSink).name("Fluss Sink");
```

### Configuration Options
The `FlussSinkBuilder` provides several methods for configuring the sink connector:

#### Required Parameters
* **setBootstrapServers(String bootstrapServers):** Sets the bootstrap servers for the Fluss sink connection
* **setDatabase(String database):** Sets the database name for the Fluss sink
* **setTable(String table):** Sets the table name for the Fluss sink
* **setSerializationSchema(FlussSerializationSchema&lt;T&gt; schema):** Sets the serialization schema for converting input records to Fluss records

#### Optional Parameters
* **setShuffleByBucketId(boolean shuffleByBucketId):** Sets whether to shuffle data by bucket ID (default: true)
* **setOption(String key, String value):** Sets a single configuration option
* **setOptions(Map&lt;String, String&gt; options):** Sets multiple configuration options at once

**Note:** `FlussSerializationSchema` needs to propagate downstream the operations that take place. See [RowDataSerializationSchema](https://github.com/apache/fluss/blob/main/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java) as an example.

### Examples

#### Writing to a Primary Key Table
When writing to a primary key table, the Fluss DataStream Connector automatically handles upserts based on the primary key.

```java
// Create a FlussSink for a primary key table
FlussSink<Order> flussSink = FlussSink.<Order>builder()
                .setBootstrapServers("localhost:9123")
                .setDatabase("mydb")
                .setTable("orders_pk")
                .setSerializationSchema(new OrderSerializationSchema())
                .build();

// Add the sink to your DataStream
dataStream.sinkTo(flussSink);
```

#### Writing to a Log Table
When writing to a log table, all records are appended.

```java
// Create a FlussSink for a log table
FlussSink<Order> flussSink = FlussSink.<Order>builder()
                .setBootstrapServers("localhost:9123")
                .setDatabase("mydb")
                .setTable("orders_log")
                .setSerializationSchema(new OrderSerializationSchema())
                .build();

// Add the sink to your DataStream
dataStream.sinkTo(flussSink);
```

#### Setting Custom Configuration Options
You can set custom configuration options for the Fluss sink.

```java
// Create a FlussSink with custom configuration options
FlussSink<Order> flussSink = FlussSink.<Order>builder()
                .setBootstrapServers("localhost:9123")
                .setDatabase("mydb")
                .setTable("orders")
                .setOption("custom.key", "custom.value")
                .setSerializationSchema(new OrderSerializationSchema())
                .build();

// Or set multiple options at once
Map<String, String> options = new HashMap<>();
options.put("option1", "value1");
options.put("option2", "value2");

FlussSink<Order> flussSink = FlussSink.<Order>builder()
        .setBootstrapServers("localhost:9123")
        .setDatabase("mydb")
        .setTable("orders")
        .setOptions(options)
        .setSerializationSchema(new OrderSerializationSchema())
        .build();
```

### Serialization Schemas
The `FlussSerializationSchema` interface is used to convert your data objects to Fluss's internal row format for writing to Fluss tables. Fluss provides built-in implementations:

* **RowDataSerializationSchema** - Converts Flink's `RowData` objects to Fluss rows
* **JsonStringSerializationSchema** - Converts JSON strings to Fluss rows

The serialization schema is used when writing data to Fluss tables using the Fluss sink. When configuring a Fluss sink, you provide a serialization schema that converts your data objects to Fluss's internal row format. The serialization schema is set using the `setSerializationSchema()` method on the sink builder.

You can implement your own serialization schema by implementing the `FlussSerializationSchema` interface:

```java
private static class Order implements Serializable {
  private static final long serialVersionUID = 1L;
  private final long orderId;
  private final long itemId;
  private final int amount;
  private final String address;
  private final RowKind rowKind; // holds the row operation

  ...
}

private static class OrderSerializationSchema
            implements FlussSerializationSchema<Order> {
        private static final long serialVersionUID = 1L;

        @Override
        public void open(InitializationContext context) throws Exception {}

        @Override
        public RowWithOp serialize(Order value) throws Exception {
            GenericRow row = new GenericRow(4);
            row.setField(0, value.orderId);
            row.setField(1, value.itemId);
            row.setField(2, value.amount);
            row.setField(3, BinaryString.fromString(value.address));

            RowKind rowKind = value.rowKind;
            switch (rowKind) {
                case INSERT:
                case UPDATE_AFTER:
                    return new RowWithOp(row, OperationType.UPSERT);
                case UPDATE_BEFORE:
                case DELETE:
                    return new RowWithOp(row, OperationType.DELETE);
                default:
                    throw new IllegalArgumentException("Unsupported row kind: " + rowKind);
            }
        }
    }
```


By default you can use the [RowDataSerializationSchema](https://github.com/apache/fluss/blob/main/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java).

The `RowDataSerializationSchema` provides additional configuration options:

* **isAppendOnly** - Whether the schema operates in append-only mode (only INSERT operations)
* **ignoreDelete** - Whether to ignore `DELETE` and `UPDATE_BEFORE` operations

```java
// Create a serialization schema for append-only operations
RowDataSerializationSchema schema = new RowDataSerializationSchema(true, false);

// Create a serialization schema that handles all operation types
RowDataSerializationSchema schema = new RowDataSerializationSchema(false, false);

// Create a serialization schema that ignores DELETE operations
RowDataSerializationSchema schema = new RowDataSerializationSchema(false, true);
```
